package org.codehaus.activemq.ra;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQSession;

public class ActiveMQManagedConnection implements ManagedConnection {
	private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
	private PrintWriter logWriter;
	private Subject subject;
	private ActiveMQConnectionRequestInfo info;
	private ArrayList listeners = new ArrayList();
	private Connection physicalConnection;
	private Session physicalSession;
	private ArrayList proxyConnections = new ArrayList();
	private XAResource xaresource = null;

	public Connection getPhysicalConnection() {
		return this.physicalConnection;
	}

	public Session getPhysicalSession() {
		return this.physicalSession;
	}

	public ActiveMQManagedConnection(Subject subject, ActiveMQResourceAdapter adapter,
			ActiveMQConnectionRequestInfo info) throws ResourceException {
		this.subject = subject;
		this.info = info;
		this.physicalConnection = adapter.getPhysicalConnection();
		createSession();
	}

	private void createSession() throws ResourceException {
		try {
			this.physicalSession = this.physicalConnection.createSession(true, 0);

			if ((this.physicalSession instanceof ActiveMQSession)) {
				ActiveMQSession session = (ActiveMQSession) this.physicalSession;
				LocalTransactionEventListener l = createLocalTransactionEventListener();
				session.setLocalTransactionEventListener(l);
			} else {
				log.trace("Cannot register LocalTransactionEventLister on non-ActiveMQ session");
			}

			if ((this.physicalSession instanceof XASession))
				this.xaresource = ((XASession) this.physicalSession).getXAResource();
			else {
				this.xaresource = null;
			}
		} catch (JMSException e) {
			throw new ResourceException("Could not create a new session.", e);
		}
	}

	private LocalTransactionEventListener createLocalTransactionEventListener() {
		return new LocalTransactionEventListener() {
			public void beginEvent() {
				ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 2);
				Iterator iterator = ActiveMQManagedConnection.this.listeners.iterator();
				while (iterator.hasNext()) {
					ConnectionEventListener l = (ConnectionEventListener) iterator.next();

					l.localTransactionStarted(event);
				}
			}

			public void commitEvent() {
				ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 3);
				Iterator iterator = ActiveMQManagedConnection.this.listeners.iterator();
				while (iterator.hasNext()) {
					ConnectionEventListener l = (ConnectionEventListener) iterator.next();

					l.localTransactionCommitted(event);
				}
			}

			public void rollbackEvent() {
				ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, 4);
				Iterator iterator = ActiveMQManagedConnection.this.listeners.iterator();
				while (iterator.hasNext()) {
					ConnectionEventListener l = (ConnectionEventListener) iterator.next();

					l.localTransactionRolledback(event);
				}
			}
		};
	}

	public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
		JMSConnectionProxy proxy = new JMSConnectionProxy(this);
		this.proxyConnections.add(proxy);
		return proxy;
	}

	private boolean isDestroyed() {
		return this.physicalConnection == null;
	}

	public void destroy() throws ResourceException {
		if (isDestroyed()) {
			return;
		}

		cleanup();
		try {
			this.physicalSession.close();
			this.physicalConnection = null;
		} catch (JMSException e) {
			log.info("Error occured during close of a JMS connection.", e);
		}
	}

	public void cleanup() throws ResourceException {
		if (isDestroyed()) {
			return;
		}

		Iterator iterator = this.proxyConnections.iterator();
		while (iterator.hasNext()) {
			JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
			proxy.cleanup();
			iterator.remove();
		}

		try {
			this.physicalSession.close();
			this.physicalSession = null;
		} catch (JMSException e) {
			throw new ResourceException("Could close the JMS session.", e);
		}

		createSession();
	}

	public void associateConnection(Object connection) throws ResourceException {
		throw new ResourceException("Not supported.");
	}

	public void addConnectionEventListener(ConnectionEventListener listener) {
		this.listeners.add(listener);
	}

	public void removeConnectionEventListener(ConnectionEventListener listener) {
		this.listeners.remove(listener);
	}

	public XAResource getXAResource() throws ResourceException {
		if (this.xaresource == null) {
			throw new ResourceException("This is not an XA connection.");
		}

		return new XAResource() {
			public void commit(Xid arg0, boolean arg1) throws XAException {
				ActiveMQManagedConnection.this.xaresource.commit(arg0, arg1);
			}

			public void end(Xid arg0, int arg1) throws XAException {
				ActiveMQManagedConnection.this.xaresource.end(arg0, arg1);
			}

			public void forget(Xid arg0) throws XAException {
				ActiveMQManagedConnection.this.xaresource.forget(arg0);
			}

			public int getTransactionTimeout() throws XAException {
				return ActiveMQManagedConnection.this.xaresource.getTransactionTimeout();
			}

			public boolean isSameRM(XAResource arg0) throws XAException {
				return ActiveMQManagedConnection.this.xaresource.isSameRM(arg0);
			}

			public int prepare(Xid arg0) throws XAException {
				return ActiveMQManagedConnection.this.xaresource.prepare(arg0);
			}

			public Xid[] recover(int arg0) throws XAException {
				return ActiveMQManagedConnection.this.xaresource.recover(arg0);
			}

			public void rollback(Xid arg0) throws XAException {
				ActiveMQManagedConnection.this.xaresource.rollback(arg0);
			}

			public boolean setTransactionTimeout(int arg0) throws XAException {
				return ActiveMQManagedConnection.this.xaresource.setTransactionTimeout(arg0);
			}

			public void start(Xid arg0, int arg1) throws XAException {
				ActiveMQManagedConnection.this.xaresource.start(arg0, arg1);
			}
		};
	}

	public LocalTransaction getLocalTransaction() throws ResourceException {
		return new LocalTransaction() {
			public void begin() {
			}

			public void commit() throws ResourceException {
				try {
					ActiveMQManagedConnection.this.physicalSession.commit();
				} catch (JMSException e) {
					throw new ResourceException("commit failed.", e);
				}
			}

			public void rollback() throws ResourceException {
				try {
					ActiveMQManagedConnection.this.physicalSession.rollback();
				} catch (JMSException e) {
					throw new ResourceException("rollback failed.", e);
				}
			}
		};
	}

	public ManagedConnectionMetaData getMetaData() throws ResourceException {
		return new ManagedConnectionMetaData() {
			public String getEISProductName() throws ResourceException {
				if (ActiveMQManagedConnection.this.physicalConnection == null)
					throw new ResourceException("Not connected.");
				try {
					return ActiveMQManagedConnection.this.physicalConnection.getMetaData().getJMSProviderName();
				} catch (JMSException e) {
					throw new ResourceException("Error accessing provider.", e);
				}

			}

			public String getEISProductVersion() throws ResourceException {
				if (ActiveMQManagedConnection.this.physicalConnection == null)
					throw new ResourceException("Not connected.");
				try {
					return ActiveMQManagedConnection.this.physicalConnection.getMetaData().getProviderVersion();
				} catch (JMSException e) {
					throw new ResourceException("Error accessing provider.", e);
				}
				
			}

			public int getMaxConnections() throws ResourceException {
				if (ActiveMQManagedConnection.this.physicalConnection == null) {
					throw new ResourceException("Not connected.");
				}
				return 2147483647;
			}

			public String getUserName() throws ResourceException {
				if (ActiveMQManagedConnection.this.physicalConnection == null)
					throw new ResourceException("Not connected.");
				try {
					return ActiveMQManagedConnection.this.physicalConnection.getClientID();
				} catch (JMSException e) {
					throw new ResourceException("Error accessing provider.", e);
				}
				
			}
		};
	}

	public void setLogWriter(PrintWriter logWriter) throws ResourceException {
		this.logWriter = logWriter;
	}

	public PrintWriter getLogWriter() throws ResourceException {
		return this.logWriter;
	}

	public boolean matches(Subject subject, ConnectionRequestInfo info) {
		if (info == null) {
			return false;
		}
		if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
			return false;
		}

		if (((subject == null ? 1 : 0) ^ (this.subject == null ? 1 : 0)) != 0) {
			return false;
		}
		if ((subject != null) && (!subject.equals(this.subject))) {
			return false;
		}

		return info.equals(this.info);
	}

	public void proxyClosedEvent(JMSConnectionProxy proxy) {
		this.proxyConnections.remove(proxy);
		proxy.cleanup();

		ConnectionEvent event = new ConnectionEvent(this, 1);

		event.setConnectionHandle(proxy);
		Iterator iterator = this.listeners.iterator();
		while (iterator.hasNext()) {
			ConnectionEventListener l = (ConnectionEventListener) iterator.next();

			l.connectionClosed(event);
		}
	}
}